package com.fintech.pangu.security.authorize;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.client.RestTemplate;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

/**
 * 基于Rest的Rbac权限缓存同步器
 */
@Slf4j
public class RestRbacAuthorizeCacheSynchronizer implements AuthorizeCacheSynchronizer, Runnable {

    // 定时同步
    private ScheduledExecutorService scheduler;

    // 长轮询同步
    private ExecutorService longPollingService;

    private AtomicBoolean started = new AtomicBoolean(false);  // 是否启动
    private AtomicBoolean longPollingStopped = new AtomicBoolean(false); //长轮询是否停止
    private int initialDelayMs;  //初次执行延时
    private int intervalMs;  //执行间隔
    private static final int LONG_POLLING_READ_TIMEOUT = 90 * 1000; // 客户端长轮询超时时间，比服务端超时时间长
    private AtomicReference<Future> scheduledPeriodicRef;

    @Autowired
    private RestTemplate restTemplate;

    public RestRbacAuthorizeCacheSynchronizer(){
        // 定时全量同步
        this.scheduler = Executors.newScheduledThreadPool(1,
                new ThreadFactoryBuilder()
                        .setNameFormat("Rest-RbacAuthorizeCachePeriodicSync-%d")
                        .setDaemon(true)
                        .build());

        // 长轮询同步
        longPollingService = Executors.newSingleThreadExecutor(
                new ThreadFactoryBuilder()
                        .setNameFormat("Rest-RbacAuthorizeCacheLongPollingSync-%d")
                        .setDaemon(true)
                        .build());

        this.scheduledPeriodicRef = new AtomicReference<Future>();
    }

    /**
     * 定期同步
     */
    private void schedulePeriodicSync(){
        Future next = scheduler.schedule(this, this.initialDelayMs, TimeUnit.SECONDS);
        scheduledPeriodicRef.set(next);
    }

    /**
     * 长轮询同步
     */
    private void scheduleLongPollingSync(){
        longPollingService.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    log.debug("Long polling will start in {} ms.", intervalMs);
                    TimeUnit.MILLISECONDS.sleep(intervalMs);
                }
                catch (InterruptedException ignore) {
                    //ignore
                }

                doLongPollingSync();
            }
        });
    }


    /**
     * 执行长轮询同步
     */
    private void doLongPollingSync() {
        // 循环执行长轮询
        while (!longPollingStopped.get() && !Thread.currentThread().isInterrupted()) {

            // TODO

            ResponseEntity<String> responseEntity = restTemplate.postForEntity("",null, String.class);

            if(responseEntity.getStatusCode().value()==200){
                // 触发手工全量同步
                this.onDemandSync();
            }

        }
    }


    /**
     * 启动同步
     * @param initialDelayMs  启动延时(ms)
     */
    @Override
    public void start(int initialDelayMs, int intervalMs) {
        // 只能启动一次
        if (started.compareAndSet(false, true)) {
            this.intervalMs = intervalMs;
            this.initialDelayMs = initialDelayMs;

            // 1、先全量同步
            this.sync();

            // 2、启动定时线程增量同步
            this.schedulePeriodicSync();

            // 3、启动长轮询
            this.scheduleLongPollingSync();
        }
    }

    /**
     * 停止同步
     */
    @Override
    public void stop() {
        scheduler.shutdown();
        started.set(false);
    }

    /**
     * 定时全量同步任务
     */
    @Override
    public void run() {
        try {
            // 全量同步
            this.sync();
        }
        catch (Exception e){
            log.error("Rbac权限定时同步异常", e);
        }
        finally {
            Future next = scheduler.schedule(this, intervalMs, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }

    /**
     * 全量同步
     */
    @Override
    public void sync() {
        log.info("rbac authorize cache sync starting ......");
    }


    /**
     * 手工全量同步
     */
    @Override
    public void onDemandSync() {
        scheduler.submit(new Runnable() {
            @Override
            public void run() {
                log.info("手工执行Rbac权限缓存同步");

                Future latestPeriodic = scheduledPeriodicRef.get();
                if (latestPeriodic != null && !latestPeriodic.isDone()) {
                    log.info("取消最近的定时同步，手工同步后会再次启动定时同步");
                    latestPeriodic.cancel(false);
                }

                RestRbacAuthorizeCacheSynchronizer.this.run();
            }
        });
    }

}
